// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

//! [`WindowUDF`]: User Defined Window Functions

use arrow::compute::SortOptions;
use std::cmp::Ordering;
use std::hash::{Hash, Hasher};
use std::{
    any::Any,
    fmt::{self, Debug, Display, Formatter},
    sync::Arc,
};

use arrow::datatypes::{DataType, FieldRef};

use crate::expr::WindowFunction;
use crate::udf_eq::UdfEq;
use crate::{
    Expr, PartitionEvaluator, Signature, function::WindowFunctionSimplification,
};
use datafusion_common::{Result, not_impl_err};
use datafusion_doc::Documentation;
use datafusion_expr_common::dyn_eq::{DynEq, DynHash};
use datafusion_functions_window_common::expr::ExpressionArgs;
use datafusion_functions_window_common::field::WindowUDFFieldArgs;
use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;

/// Logical representation of a user-defined window function (UDWF).
///
/// A Window Function is called via the SQL `OVER` clause:
///
/// ```sql
/// SELECT first_value(col) OVER (PARTITION BY a, b ORDER BY c) FROM foo;
/// ```
///
/// A UDWF is different from a user defined function (UDF) in that it is
/// stateful across batches.
///
/// See the documentation on [`PartitionEvaluator`] for more details
///
/// 1. For simple use cases, use [`create_udwf`] (examples in
///    [`simple_udwf.rs`]).
///
/// 2. For advanced use cases, use [`WindowUDFImpl`] which provides full API
///    access (examples in [`advanced_udwf.rs`]).
///
/// # API Note
/// This is a separate struct from `WindowUDFImpl` to maintain backwards
/// compatibility with the older API.
///
/// [`PartitionEvaluator`]: crate::PartitionEvaluator
/// [`create_udwf`]: crate::expr_fn::create_udwf
/// [`simple_udwf.rs`]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/udf/simple_udwf.rs
/// [`advanced_udwf.rs`]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/udf/advanced_udwf.rs
#[derive(Debug, Clone, PartialOrd)]
pub struct WindowUDF {
    inner: Arc<dyn WindowUDFImpl>,
}

/// Defines how the WindowUDF is shown to users
impl Display for WindowUDF {
    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
        write!(f, "{}", self.name())
    }
}

impl PartialEq for WindowUDF {
    fn eq(&self, other: &Self) -> bool {
        self.inner.dyn_eq(other.inner.as_any())
    }
}

impl Eq for WindowUDF {}

impl Hash for WindowUDF {
    fn hash<H: Hasher>(&self, state: &mut H) {
        self.inner.dyn_hash(state)
    }
}

impl WindowUDF {
    /// Create a new `WindowUDF` from a `[WindowUDFImpl]` trait object
    ///
    /// Note this is the same as using the `From` impl (`WindowUDF::from`)
    pub fn new_from_impl<F>(fun: F) -> WindowUDF
    where
        F: WindowUDFImpl + 'static,
    {
        Self::new_from_shared_impl(Arc::new(fun))
    }

    /// Create a new `WindowUDF` from a `[WindowUDFImpl]` trait object
    pub fn new_from_shared_impl(fun: Arc<dyn WindowUDFImpl>) -> WindowUDF {
        Self { inner: fun }
    }

    /// Return the underlying [`WindowUDFImpl`] trait object for this function
    pub fn inner(&self) -> &Arc<dyn WindowUDFImpl> {
        &self.inner
    }

    /// Adds additional names that can be used to invoke this function, in
    /// addition to `name`
    ///
    /// If you implement [`WindowUDFImpl`] directly you should return aliases directly.
    pub fn with_aliases(self, aliases: impl IntoIterator<Item = &'static str>) -> Self {
        Self::new_from_impl(AliasedWindowUDFImpl::new(Arc::clone(&self.inner), aliases))
    }

    /// creates a [`Expr`] that calls the window function with default
    /// values for `order_by`, `partition_by`, `window_frame`.
    ///
    /// See [`ExprFunctionExt`] for details on setting these values.
    ///
    /// This utility allows using a user defined window function without
    /// requiring access to the registry, such as with the DataFrame API.
    ///
    /// [`ExprFunctionExt`]: crate::expr_fn::ExprFunctionExt
    pub fn call(&self, args: Vec<Expr>) -> Expr {
        let fun = crate::WindowFunctionDefinition::WindowUDF(Arc::new(self.clone()));

        Expr::from(WindowFunction::new(fun, args))
    }

    /// Returns this function's name
    ///
    /// See [`WindowUDFImpl::name`] for more details.
    pub fn name(&self) -> &str {
        self.inner.name()
    }

    /// Returns the aliases for this function.
    pub fn aliases(&self) -> &[String] {
        self.inner.aliases()
    }

    /// Returns this function's signature (what input types are accepted)
    ///
    /// See [`WindowUDFImpl::signature`] for more details.
    pub fn signature(&self) -> &Signature {
        self.inner.signature()
    }

    /// Do the function rewrite
    ///
    /// See [`WindowUDFImpl::simplify`] for more details.
    pub fn simplify(&self) -> Option<WindowFunctionSimplification> {
        self.inner.simplify()
    }

    /// Expressions that are passed to the [`PartitionEvaluator`].
    ///
    /// See [`WindowUDFImpl::expressions`] for more details.
    pub fn expressions(&self, expr_args: ExpressionArgs) -> Vec<Arc<dyn PhysicalExpr>> {
        self.inner.expressions(expr_args)
    }
    /// Return a `PartitionEvaluator` for evaluating this window function
    pub fn partition_evaluator_factory(
        &self,
        partition_evaluator_args: PartitionEvaluatorArgs,
    ) -> Result<Box<dyn PartitionEvaluator>> {
        self.inner.partition_evaluator(partition_evaluator_args)
    }

    /// Returns the field of the final result of evaluating this window function.
    ///
    /// See [`WindowUDFImpl::field`] for more details.
    pub fn field(&self, field_args: WindowUDFFieldArgs) -> Result<FieldRef> {
        self.inner.field(field_args)
    }

    /// Returns custom result ordering introduced by this window function
    /// which is used to update ordering equivalences.
    ///
    /// See [`WindowUDFImpl::sort_options`] for more details.
    pub fn sort_options(&self) -> Option<SortOptions> {
        self.inner.sort_options()
    }

    /// See [`WindowUDFImpl::coerce_types`] for more details.
    pub fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
        self.inner.coerce_types(arg_types)
    }

    /// Returns the reversed user-defined window function when the
    /// order of evaluation is reversed.
    ///
    /// See [`WindowUDFImpl::reverse_expr`] for more details.
    pub fn reverse_expr(&self) -> ReversedUDWF {
        self.inner.reverse_expr()
    }

    /// Returns the documentation for this Window UDF.
    ///
    /// Documentation can be accessed programmatically as well as
    /// generating publicly facing documentation.
    pub fn documentation(&self) -> Option<&Documentation> {
        self.inner.documentation()
    }
}

impl<F> From<F> for WindowUDF
where
    F: WindowUDFImpl + Send + Sync + 'static,
{
    fn from(fun: F) -> Self {
        Self::new_from_impl(fun)
    }
}

/// Trait for implementing [`WindowUDF`].
///
/// This trait exposes the full API for implementing user defined window functions and
/// can be used to implement any function.
///
/// While the trait depends on [`DynEq`] and [`DynHash`] traits, these should not be
/// implemented directly. Instead, implement [`Eq`] and [`Hash`] and leverage the
/// blanket implementations of [`DynEq`] and [`DynHash`].
///
/// See [`advanced_udwf.rs`] for a full example with complete implementation and
/// [`WindowUDF`] for other available options.
///
///
/// [`advanced_udwf.rs`]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/udf/advanced_udwf.rs
/// # Basic Example
/// ```
/// # use std::any::Any;
/// # use std::sync::LazyLock;
/// # use arrow::datatypes::{DataType, Field, FieldRef};
/// # use datafusion_common::{DataFusionError, plan_err, Result};
/// # use datafusion_expr::{col, Signature, Volatility, PartitionEvaluator, WindowFrame, ExprFunctionExt, Documentation, LimitEffect};
/// # use datafusion_expr::{WindowUDFImpl, WindowUDF};
/// # use datafusion_functions_window_common::field::WindowUDFFieldArgs;
/// # use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;
/// # use datafusion_expr::window_doc_sections::DOC_SECTION_ANALYTICAL;
/// # use datafusion_physical_expr_common::physical_expr;
/// # use std::sync::Arc;
///
/// #[derive(Debug, Clone, PartialEq, Eq, Hash)]
/// struct SmoothIt {
///   signature: Signature,
/// }
///
/// impl SmoothIt {
///   fn new() -> Self {
///     Self {
///       signature: Signature::uniform(1, vec![DataType::Int32], Volatility::Immutable),
///      }
///   }
/// }
///
/// static DOCUMENTATION: LazyLock<Documentation> = LazyLock::new(|| {
///     Documentation::builder(DOC_SECTION_ANALYTICAL, "smooths the windows", "smooth_it(2)")
///         .with_argument("arg1", "The int32 number to smooth by")
///         .build()
/// });
///
/// fn get_doc() -> &'static Documentation {
///     &DOCUMENTATION
/// }
///
/// /// Implement the WindowUDFImpl trait for SmoothIt
/// impl WindowUDFImpl for SmoothIt {
///    fn as_any(&self) -> &dyn Any { self }
///    fn name(&self) -> &str { "smooth_it" }
///    fn signature(&self) -> &Signature { &self.signature }
///    // The actual implementation would smooth the window
///    fn partition_evaluator(
///        &self,
///        _partition_evaluator_args: PartitionEvaluatorArgs,
///    ) -> Result<Box<dyn PartitionEvaluator>> {
///        unimplemented!()
///    }
///    fn field(&self, field_args: WindowUDFFieldArgs) -> Result<FieldRef> {
///      if let Some(DataType::Int32) = field_args.get_input_field(0).map(|f| f.data_type().clone()) {
///        Ok(Field::new(field_args.name(), DataType::Int32, false).into())
///      } else {
///        plan_err!("smooth_it only accepts Int32 arguments")
///      }
///    }
///    fn documentation(&self) -> Option<&Documentation> {
///      Some(get_doc())
///    }
///     fn limit_effect(&self, _args: &[Arc<dyn physical_expr::PhysicalExpr>]) -> LimitEffect {
///         LimitEffect::Unknown
///     }
/// }
///
/// // Create a new WindowUDF from the implementation
/// let smooth_it = WindowUDF::from(SmoothIt::new());
///
/// // Call the function `add_one(col)`
/// // smooth_it(speed) OVER (PARTITION BY car ORDER BY time ASC)
/// let expr = smooth_it.call(vec![col("speed")])
///     .partition_by(vec![col("car")])
///     .order_by(vec![col("time").sort(true, true)])
///     .window_frame(WindowFrame::new(None))
///     .build()
///     .unwrap();
/// ```
pub trait WindowUDFImpl: Debug + DynEq + DynHash + Send + Sync {
    /// Returns this object as an [`Any`] trait object
    fn as_any(&self) -> &dyn Any;

    /// Returns this function's name
    fn name(&self) -> &str;

    /// Returns any aliases (alternate names) for this function.
    ///
    /// Note: `aliases` should only include names other than [`Self::name`].
    /// Defaults to `[]` (no aliases)
    fn aliases(&self) -> &[String] {
        &[]
    }

    /// Returns the function's [`Signature`] for information about what input
    /// types are accepted and the function's Volatility.
    fn signature(&self) -> &Signature;

    /// Returns the expressions that are passed to the [`PartitionEvaluator`].
    fn expressions(&self, expr_args: ExpressionArgs) -> Vec<Arc<dyn PhysicalExpr>> {
        expr_args.input_exprs().into()
    }

    /// Invoke the function, returning the [`PartitionEvaluator`] instance
    fn partition_evaluator(
        &self,
        partition_evaluator_args: PartitionEvaluatorArgs,
    ) -> Result<Box<dyn PartitionEvaluator>>;

    /// Optionally apply per-UDWF simplification / rewrite rules.
    ///
    /// This can be used to apply function specific simplification rules during
    /// optimization. The default implementation does nothing.
    ///
    /// Note that DataFusion handles simplifying arguments and  "constant
    /// folding" (replacing a function call with constant arguments such as
    /// `my_add(1,2) --> 3` ). Thus, there is no need to implement such
    /// optimizations manually for specific UDFs.
    ///
    /// Example:
    /// `advanced_udwf.rs`: <https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/udf/advanced_udwf.rs>
    ///
    /// # Returns
    /// [None] if simplify is not defined or,
    ///
    /// Or, a closure with two arguments:
    /// * 'window_function': [crate::expr::WindowFunction] for which simplified has been invoked
    /// * 'info': [crate::simplify::SimplifyInfo]
    ///
    /// # Notes
    /// The returned expression must have the same schema as the original
    /// expression, including both the data type and nullability. For example,
    /// if the original expression is nullable, the returned expression must
    /// also be nullable, otherwise it may lead to schema verification errors
    /// later in query planning.
    fn simplify(&self) -> Option<WindowFunctionSimplification> {
        None
    }

    /// The [`FieldRef`] of the final result of evaluating this window function.
    ///
    /// Call `field_args.name()` to get the fully qualified name for defining
    /// the [`FieldRef`]. For a complete example see the implementation in the
    /// [Basic Example](WindowUDFImpl#basic-example) section.
    fn field(&self, field_args: WindowUDFFieldArgs) -> Result<FieldRef>;

    /// Allows the window UDF to define a custom result ordering.
    ///
    /// By default, a window UDF doesn't introduce an ordering.
    /// But when specified by a window UDF this is used to update
    /// ordering equivalences.
    fn sort_options(&self) -> Option<SortOptions> {
        None
    }

    /// Coerce arguments of a function call to types that the function can evaluate.
    ///
    /// This function is only called if [`WindowUDFImpl::signature`] returns [`crate::TypeSignature::UserDefined`]. Most
    /// UDWFs should return one of the other variants of `TypeSignature` which handle common
    /// cases
    ///
    /// See the [type coercion module](crate::type_coercion)
    /// documentation for more details on type coercion
    ///
    /// For example, if your function requires a floating point arguments, but the user calls
    /// it like `my_func(1::int)` (aka with `1` as an integer), coerce_types could return `[DataType::Float64]`
    /// to ensure the argument was cast to `1::double`
    ///
    /// # Parameters
    /// * `arg_types`: The argument types of the arguments  this function with
    ///
    /// # Return value
    /// A Vec the same length as `arg_types`. DataFusion will `CAST` the function call
    /// arguments to these specific types.
    fn coerce_types(&self, _arg_types: &[DataType]) -> Result<Vec<DataType>> {
        not_impl_err!("Function {} does not implement coerce_types", self.name())
    }

    /// Allows customizing the behavior of the user-defined window
    /// function when it is evaluated in reverse order.
    fn reverse_expr(&self) -> ReversedUDWF {
        ReversedUDWF::NotSupported
    }

    /// Returns the documentation for this Window UDF.
    ///
    /// Documentation can be accessed programmatically as well as
    /// generating publicly facing documentation.
    fn documentation(&self) -> Option<&Documentation> {
        None
    }

    /// If not causal, returns the effect this function will have on the window
    fn limit_effect(&self, _args: &[Arc<dyn PhysicalExpr>]) -> LimitEffect {
        LimitEffect::Unknown
    }
}

/// the effect this function will have on the limit pushdown
pub enum LimitEffect {
    /// Does not affect the limit (i.e. this is causal)
    None,
    /// Either undeclared, or dynamic (only evaluatable at run time)
    Unknown,
    /// Grow the limit by N rows
    Relative(usize),
    /// Limit needs to be at least N rows
    Absolute(usize),
}

pub enum ReversedUDWF {
    /// The result of evaluating the user-defined window function
    /// remains identical when reversed.
    Identical,
    /// A window function which does not support evaluating the result
    /// in reverse order.
    NotSupported,
    /// Customize the user-defined window function for evaluating the
    /// result in reverse order.
    Reversed(Arc<WindowUDF>),
}

impl PartialEq for dyn WindowUDFImpl {
    fn eq(&self, other: &Self) -> bool {
        self.dyn_eq(other.as_any())
    }
}

impl PartialOrd for dyn WindowUDFImpl {
    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
        match self.name().partial_cmp(other.name()) {
            Some(Ordering::Equal) => self.signature().partial_cmp(other.signature()),
            cmp => cmp,
        }
        // TODO (https://github.com/apache/datafusion/issues/17477) avoid recomparing all fields
        .filter(|cmp| *cmp != Ordering::Equal || self == other)
    }
}

/// WindowUDF that adds an alias to the underlying function. It is better to
/// implement [`WindowUDFImpl`], which supports aliases, directly if possible.
#[derive(Debug, PartialEq, Eq, Hash)]
struct AliasedWindowUDFImpl {
    inner: UdfEq<Arc<dyn WindowUDFImpl>>,
    aliases: Vec<String>,
}

impl AliasedWindowUDFImpl {
    pub fn new(
        inner: Arc<dyn WindowUDFImpl>,
        new_aliases: impl IntoIterator<Item = &'static str>,
    ) -> Self {
        let mut aliases = inner.aliases().to_vec();
        aliases.extend(new_aliases.into_iter().map(|s| s.to_string()));

        Self {
            inner: inner.into(),
            aliases,
        }
    }
}

#[warn(clippy::missing_trait_methods)] // Delegates, so it should implement every single trait method
impl WindowUDFImpl for AliasedWindowUDFImpl {
    fn as_any(&self) -> &dyn Any {
        self
    }

    fn name(&self) -> &str {
        self.inner.name()
    }

    fn signature(&self) -> &Signature {
        self.inner.signature()
    }

    fn expressions(&self, expr_args: ExpressionArgs) -> Vec<Arc<dyn PhysicalExpr>> {
        expr_args
            .input_exprs()
            .first()
            .map_or(vec![], |expr| vec![Arc::clone(expr)])
    }

    fn partition_evaluator(
        &self,
        partition_evaluator_args: PartitionEvaluatorArgs,
    ) -> Result<Box<dyn PartitionEvaluator>> {
        self.inner.partition_evaluator(partition_evaluator_args)
    }

    fn aliases(&self) -> &[String] {
        &self.aliases
    }

    fn simplify(&self) -> Option<WindowFunctionSimplification> {
        self.inner.simplify()
    }

    fn field(&self, field_args: WindowUDFFieldArgs) -> Result<FieldRef> {
        self.inner.field(field_args)
    }

    fn sort_options(&self) -> Option<SortOptions> {
        self.inner.sort_options()
    }

    fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
        self.inner.coerce_types(arg_types)
    }

    fn reverse_expr(&self) -> ReversedUDWF {
        self.inner.reverse_expr()
    }

    fn documentation(&self) -> Option<&Documentation> {
        self.inner.documentation()
    }

    fn limit_effect(&self, args: &[Arc<dyn PhysicalExpr>]) -> LimitEffect {
        self.inner.limit_effect(args)
    }
}

#[cfg(test)]
mod test {
    use crate::{LimitEffect, PartitionEvaluator, WindowUDF, WindowUDFImpl};
    use arrow::datatypes::{DataType, FieldRef};
    use datafusion_common::Result;
    use datafusion_expr_common::signature::{Signature, Volatility};
    use datafusion_functions_window_common::field::WindowUDFFieldArgs;
    use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;
    use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
    use std::any::Any;
    use std::cmp::Ordering;
    use std::hash::{DefaultHasher, Hash, Hasher};
    use std::sync::Arc;

    #[derive(Debug, Clone, PartialEq, Eq, Hash)]
    struct AWindowUDF {
        signature: Signature,
    }

    impl AWindowUDF {
        fn new() -> Self {
            Self {
                signature: Signature::uniform(
                    1,
                    vec![DataType::Int32],
                    Volatility::Immutable,
                ),
            }
        }
    }

    /// Implement the WindowUDFImpl trait for AddOne
    impl WindowUDFImpl for AWindowUDF {
        fn as_any(&self) -> &dyn Any {
            self
        }
        fn name(&self) -> &str {
            "a"
        }
        fn signature(&self) -> &Signature {
            &self.signature
        }
        fn partition_evaluator(
            &self,
            _partition_evaluator_args: PartitionEvaluatorArgs,
        ) -> Result<Box<dyn PartitionEvaluator>> {
            unimplemented!()
        }
        fn field(&self, _field_args: WindowUDFFieldArgs) -> Result<FieldRef> {
            unimplemented!()
        }

        fn limit_effect(&self, _args: &[Arc<dyn PhysicalExpr>]) -> LimitEffect {
            LimitEffect::Unknown
        }
    }

    #[derive(Debug, Clone, PartialEq, Eq, Hash)]
    struct BWindowUDF {
        signature: Signature,
    }

    impl BWindowUDF {
        fn new() -> Self {
            Self {
                signature: Signature::uniform(
                    1,
                    vec![DataType::Int32],
                    Volatility::Immutable,
                ),
            }
        }
    }

    /// Implement the WindowUDFImpl trait for AddOne
    impl WindowUDFImpl for BWindowUDF {
        fn as_any(&self) -> &dyn Any {
            self
        }
        fn name(&self) -> &str {
            "b"
        }
        fn signature(&self) -> &Signature {
            &self.signature
        }
        fn partition_evaluator(
            &self,
            _partition_evaluator_args: PartitionEvaluatorArgs,
        ) -> Result<Box<dyn PartitionEvaluator>> {
            unimplemented!()
        }
        fn field(&self, _field_args: WindowUDFFieldArgs) -> Result<FieldRef> {
            unimplemented!()
        }

        fn limit_effect(&self, _args: &[Arc<dyn PhysicalExpr>]) -> LimitEffect {
            LimitEffect::Unknown
        }
    }

    #[test]
    fn test_partial_eq() {
        let a1 = WindowUDF::from(AWindowUDF::new());
        let a2 = WindowUDF::from(AWindowUDF::new());
        let eq = a1 == a2;
        assert!(eq);
        assert_eq!(a1, a2);
        assert_eq!(hash(a1), hash(a2));
    }

    #[test]
    fn test_partial_ord() {
        let a1 = WindowUDF::from(AWindowUDF::new());
        let a2 = WindowUDF::from(AWindowUDF::new());
        assert_eq!(a1.partial_cmp(&a2), Some(Ordering::Equal));

        let b1 = WindowUDF::from(BWindowUDF::new());
        assert!(a1 < b1);
        assert!(!(a1 == b1));
    }

    fn hash<T: Hash>(value: T) -> u64 {
        let hasher = &mut DefaultHasher::new();
        value.hash(hasher);
        hasher.finish()
    }
}
